package com.app.server;

import java.sql.Connection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;

import com.app.db.DaoUtil;
import com.app.store.AbstractMySqlStore;

public class UpdateQueueRunnable extends QueueRunnable {

	public UpdateQueueRunnable(String tableName,AbstractMySqlStore store,ScheduledExecutorService service) {
		super(tableName,store,store.genUpdateQueueKeyLevel1(tableName),service);
	}

	@Override
	public void run() {
		Connection conn = null;
		Jedis jedis = null;
		String updateKey = null;
		try {
			jedis = client.getJedis();
			conn = pool.getConnection();
			while (!stop && (updateKey = jedis.spop(queueKey)) != null) {
				Transaction trans = null;
				LinkedHashMap<String, String> map = null;
				try {
					trans = jedis.multi();
					Response<Map<String, String>> rsp = trans.hgetAll(updateKey);
					trans.del(updateKey);
					trans.exec();
					trans = null;
					Map<String, String> kv = rsp.get();
					if(kv == null || kv.size() == 0) {
						continue;
					}
					map = new LinkedHashMap<String, String>(kv);
					store.putFromUpdateQueue(tableName, map, conn);
					errorTimes = 0;
					if(logger.isDebugEnabled()) {
						logger.debug("update---"+updateKey);
					}
				} catch (Exception e) {
					e.printStackTrace();
					logger.error(e.getMessage() + " ,updateQueue tableName:" + tableName,e);
					errorTimes ++;
					if(trans != null) {//取消事务
						trans.discard();
						trans = null;
					}
					if(map != null && updateKey != null) {
						jedis.hmset(updateKey, map);
						jedis.sadd(queueKey, updateKey);
					}
					errorTimes ++;
				} 
			}
		} catch (Exception e) {
			e.printStackTrace();
			logger.error(e.getMessage() + " ,updateQueue tableName:" + tableName,e);
			errorTimes ++;
			if(errorTimes > 10) {
				logger.error(queueKey+" errorTimes:"+errorTimes);
			}
		} finally {
			DaoUtil.close(conn);
			conn = null;
			client.returnJedis(jedis);
			jedis = null;
			if(!stop) {
				service.schedule(this, UPDATE_INTERVAL, TimeUnit.SECONDS);
			}
			else {
				done = true;
				logger.info(queueKey+" is done......");
			}
		}
	}
}